package org.eocencle.magnet.spark1.util;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.*;
import org.eocencle.magnet.core.mapping.DataSourceField;
import org.eocencle.magnet.core.util.CoreTag;
import org.eocencle.magnet.core.util.StrictMap;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * spark操作通用类
 * @author: huan
 * @Date: 2020-04-21
 * @Description:
 */
public class SparkUtil {
    /**
     * 创建RDD
     * @Author huan
     * @Date 2020-04-22
     * @Param [context, src, separator, fieldMap]
     * @Return org.apache.spark.api.java.JavaRDD<org.apache.spark.sql.Row>
     * @Exception
     * @Description
     **/
    public static JavaRDD<Row> createRDD(JavaSparkContext context, String src, String separator, StrictMap<DataSourceField> fieldMap) {
        JavaRDD<String> lines = context.textFile(src);
        return createRDD(lines, separator, fieldMap);
    }

    /**
     * 创建RDD
     * @Author huan
     * @Date 2020-04-22
     * @Param [lines, separator, fieldMap]
     * @Return org.apache.spark.api.java.JavaRDD<org.apache.spark.sql.Row>
     * @Exception
     * @Description
     **/
    public static JavaRDD<Row> createRDD(JavaRDD<String> lines, String separator, StrictMap<DataSourceField> fieldMap) {
        return lines.map((String line) -> {
            String[] row = line.split(separator, -1);
            Object[] fields = new Object[fieldMap.size()];
            int i = 0;
            String type = null;
            SimpleDateFormat tagSdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            for (Map.Entry<String, DataSourceField> entry: fieldMap.entrySet()) {
                type = entry.getValue().getType();

                if (CoreTag.TABLE_FIELD_TYPE_STRING.equalsIgnoreCase(type)) {
                    fields[i] = row[i];
                } else if (CoreTag.TABLE_FIELD_TYPE_BOOLEAN.equalsIgnoreCase(type)
                        || CoreTag.TABLE_FIELD_TYPE_BOOL.equalsIgnoreCase(type)) {
                    if (StringUtils.isNotBlank(row[i])) {
                        fields[i] = Boolean.valueOf(row[i]);
                    } else {
                        fields[i] = null;
                    }
                } else if (CoreTag.TABLE_FIELD_TYPE_DATETIME.equalsIgnoreCase(type)) {
                    if (StringUtils.isNotBlank(row[i])) {
                        SimpleDateFormat srcSdf = new SimpleDateFormat(entry.getValue().getFormat());
                        fields[i] = Timestamp.valueOf(tagSdf.format(srcSdf.parse(row[i])));
                    } else {
                        fields[i] = null;
                    }
                } else if (CoreTag.TABLE_FIELD_TYPE_DOUBLE.equalsIgnoreCase(type)) {
                    if (StringUtils.isNotBlank(row[i])) {
                        fields[i] = Double.valueOf(row[i]);
                    } else {
                        fields[i] = null;
                    }
                } else if (CoreTag.TABLE_FIELD_TYPE_FLOAT.equalsIgnoreCase(type)) {
                    if (StringUtils.isNotBlank(row[i])) {
                        fields[i] = Float.valueOf(row[i]);
                    } else {
                        fields[i] = null;
                    }
                } else if (CoreTag.TABLE_FIELD_TYPE_BYTE.equalsIgnoreCase(type)) {
                    if (StringUtils.isNotBlank(row[i])) {
                        fields[i] = Byte.valueOf(row[i]);
                    } else {
                        fields[i] = null;
                    }
                } else if (CoreTag.TABLE_FIELD_TYPE_INTEGER.equalsIgnoreCase(type)
                        || CoreTag.TABLE_FIELD_TYPE_INT.equalsIgnoreCase(type)) {
                    if (StringUtils.isNotBlank(row[i])) {
                        fields[i] = Integer.valueOf(row[i]);
                    } else {
                        fields[i] = null;
                    }
                } else if (CoreTag.TABLE_FIELD_TYPE_LONG.equalsIgnoreCase(type)) {
                    if (StringUtils.isNotBlank(row[i])) {
                        fields[i] = Long.valueOf(row[i]);
                    } else {
                        fields[i] = null;
                    }
                } else if (CoreTag.TABLE_FIELD_TYPE_SHORT.equalsIgnoreCase(type)) {
                    if (StringUtils.isNotBlank(row[i])) {
                        fields[i] = Short.valueOf(row[i]);
                    } else {
                        fields[i] = null;
                    }
                } else if (CoreTag.TABLE_FIELD_TYPE_DECIMAL.equalsIgnoreCase(type)) {
                    if (StringUtils.isNotBlank(row[i])) {
                        fields[i] = Decimal.apply(row[i]);
                    } else {
                        fields[i] = null;
                    }
                } else {
                    fields[i] = row[i];
                }

                i ++;
            }
            return RowFactory.create(fields);
        });
    }

    /**
     * 创建DataFrame
     * @Author huan
     * @Date 2020-04-22
     * @Param [sc, fieldMap, rdd]
     * @Return org.apache.spark.sql.DataFrame
     * @Exception
     * @Description
     **/
    public static DataFrame createDataFrame(SQLContext sqlContext, StrictMap<DataSourceField> fieldMap, JavaRDD<Row> rdd) {
        List<StructField> structFields = new ArrayList<StructField>();
        String name = null, type = null;
        DataType fieldType = null;
        DataSourceField field = null;
        for (Map.Entry<String, DataSourceField> entry: fieldMap.entrySet()) {
            field = entry.getValue();
            name = field.getName();
            type = field.getType();

            if (CoreTag.TABLE_FIELD_TYPE_STRING.equalsIgnoreCase(type)) {
                fieldType = DataTypes.StringType;
            } else if (CoreTag.TABLE_FIELD_TYPE_BINARY.equalsIgnoreCase(type)) {
                fieldType = DataTypes.BinaryType;
            } else if (CoreTag.TABLE_FIELD_TYPE_BOOLEAN.equalsIgnoreCase(type)
                    || CoreTag.TABLE_FIELD_TYPE_BOOL.equalsIgnoreCase(type)) {
                fieldType = DataTypes.BooleanType;
            } else if (CoreTag.TABLE_FIELD_TYPE_DATETIME.equalsIgnoreCase(type) ||
                    CoreTag.TABLE_FIELD_TYPE_TIMESTAMP.equalsIgnoreCase(type)) {
                fieldType = DataTypes.TimestampType;
            } else if (CoreTag.TABLE_FIELD_TYPE_CALENDARINTERVAL.equalsIgnoreCase(type)) {
                fieldType = DataTypes.CalendarIntervalType;
            } else if (CoreTag.TABLE_FIELD_TYPE_DOUBLE.equalsIgnoreCase(type)) {
                fieldType = DataTypes.DoubleType;
            } else if (CoreTag.TABLE_FIELD_TYPE_FLOAT.equalsIgnoreCase(type)) {
                fieldType = DataTypes.FloatType;
            } else if (CoreTag.TABLE_FIELD_TYPE_BYTE.equalsIgnoreCase(type)) {
                fieldType = DataTypes.ByteType;
            } else if (CoreTag.TABLE_FIELD_TYPE_INTEGER.equalsIgnoreCase(type)
                    || CoreTag.TABLE_FIELD_TYPE_INT.equalsIgnoreCase(type)) {
                fieldType = DataTypes.IntegerType;
            } else if (CoreTag.TABLE_FIELD_TYPE_LONG.equalsIgnoreCase(type)) {
                fieldType = DataTypes.LongType;
            } else if (CoreTag.TABLE_FIELD_TYPE_SHORT.equalsIgnoreCase(type)) {
                fieldType = DataTypes.ShortType;
            } else if (CoreTag.TABLE_FIELD_TYPE_DECIMAL.equalsIgnoreCase(type)) {
                if (StringUtils.isNotBlank(field.getPrecision())) {
                    String[] precision = field.getPrecision().split(CoreTag.SPLIT_COMMA);
                    if (2 == precision.length) {
                        fieldType = DataTypes.createDecimalType(Integer.parseInt(precision[0]),Integer.parseInt(precision[1]));
                    } else {
                        fieldType = DataTypes.createDecimalType();
                    }
                } else {
                    fieldType = DataTypes.createDecimalType();
                }
            } else {
                fieldType = DataTypes.NullType;
            }

            structFields.add(DataTypes.createStructField(name, fieldType, true));
        }
        StructType structType = DataTypes.createStructType(structFields);
        return sqlContext.createDataFrame(rdd, structType);
    }
}
